Skip to content

组件 Canal

参考: https://blog.csdn.net/zhangzehai2234/article/details/135162470

介绍:阿里的一个MySQL binlog增量订阅&消费组件,可以同于于数据库增量日志解析的数据同步工具;

使用示例:

在服务端配置之前需要进行一下 配置MySQL源服务器

开启binlog:确保MySQL的my.cnf配置文件中开启了binlog。

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

创建Canal用户:在MySQL中创建一个用于Canal连接的用户,并授予必要的权限。

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

服务端配置

下载并解压Canal:从GitHub下载Canal的最新版并解压。

配置Canal:修改conf/example/instance.properties文件,设置连接到MySQL源服务器的详细信息。

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.tsdb.enable=true
canal.instance.gtidon=false

启动Canal服务:运行Canal目录下的bin/startup.sh脚本启动Canal服务。

客户端配置

引入依赖:

<dependency>
    <groupId>com.alibaba.canal</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>

创建 Canal 客户端连接

在Java代码中,需要创建 Canal客户端连接,并指定Canal服务器的地址和端口号。示例代码如下:

import com.alibaba.canal.client.CanalConnector;
import com.alibaba.canal.client.CanalConnectors;

public class CanalClientExample {
    public static void main(String[] args) {
        // 创建Canal客户端连接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");
        try {
            // 连接Canal服务器
            connector.connect();
            // 订阅数据库表
            connector.subscribe(".*\\..*");
            // 处理数据变更事件
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(100L); // 获取数据变更事件
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    Thread.sleep(1000);
                } else {
                    // 处理数据变更事件
                    process(message.getEntries());
                    // 确认处理完成
                    connector.ack(batchId);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭Canal客户端连接
            connector.disconnect();
        }
    }
}

在实际的使用场景中,可以通过实现自定义的数据处理逻辑、使用过滤器过滤无用的数据变更事件等方式来优化数据处理效率。同时,也需要注意异常处理和性能优化等方面的问题。

集群环境

  • 准备环境:首先需要准备相应的环境,包括JDK、MySQL、Zookeeper等。其中,JDK需要使用1.8版本,MySQL用于存储Canal的元数据,Zookeeper用于实现Canal的HA和高可用性。
  • 下载安装:从Canal的GitHub发布页面下载最新的Canal二进制包,解压后放置到合适的位置。
  • 配置Canal:根据实际情况,配置Canal的参数,包括MySQL和Zookeeper的地址等。同时,需要配置Canal的sharding规则,指定哪些数据库需要同步,以及同步的数据范围等。
  • 启动Canal:启动Canal集群中的节点,每个节点都需要启动Canal Server和Canal Client两个进程。Canal Server主要负责接收数据库的增量日志,而Canal Client负责将这些日志同步到目标系统。
  • 监控维护:需要定期查看Canal的运行状态,包括节点状态、同步情况等。同时,也需要及时处理异常和问题,保证Canal的正常运行。

集群部署数据一致性保证

在Canal的集群部署中,为了保证数据的一致性,需要采取一些措施来实现。以下是一些可能的方案:

  • 分布式事务 :使用分布式事务来保证数据的一致性。当Canal集群中的节点进行数据同步时,可以借助分布式事务来确保数据的完整性和一致性。
  • 数据校验 :在数据同步过程中,对数据进行校验,确保数据的一致性。可以使用校验和、哈希等方式进行数据校验,以确保数据的准确性和完整性。
  • 故障恢复 :在节点故障或网络故障等异常情况下,需要采取相应的措施进行故障恢复,以保证数据的一致性。可以使用Zookeeper等分布式协调服务来实现故障自动恢复和数据一致性保证。
  • 数据备份和恢复 :定期对Canal集群中的数据进行备份,以防止数据丢失和损坏。在数据丢失或损坏的情况下,可以及时进行数据恢复,以保证数据的一致性。
  • 监控和维护 :定期监控Canal集群的运行状态和数据同步情况,及时发现和处理异常和问题,以保证数据的一致性。

需要注意的是,在实现数据一致性的过程中,需要考虑性能和可用性的平衡。不能为了追求数据一致性而牺牲性能和可用性。需要根据实际情况选择合适的方案,并进行充分的测试和验证。

说一下具体实现:

集群模式:多个Canal Server实例组成的集群,通过外部协调者(如Zookeeper)进行管理,实现高可用。

  • 集群模式配置:在Canal的配置文件中指定Zookeeper的地址,让Canal Server在启动时注册到Zookeeper,并监听Zookeeper的节点变化来实现自动选举和故障转移。
  • 运行原理
    • 每个Canal Server启动时,会在Zookeeper中注册自己,并尝试成为Master节点。
    • 通过Zookeeper的临时节点和选举机制,保证同一时间内只有一个Canal Server作为Master节点,负责数据的同步任务。
    • 如果当前的Master节点因故障宕机,Zookeeper会从剩余的Canal Server中选举一个新的Master节点,保证数据同步的持续性。

关于数据库之间的全量同步和增量同步

Canal 本身设计的初衷是用来进行增量数据同步,即通过模拟 MySQL Slave 的方式订阅并解析 MySQL binlog 实现实时数据变更的捕获和消费。因此,Canal 的主要能力集中在增量同步上。至于全量同步,Canal 本身并不直接支持,通常需要与其他工具结合使用来完成全量数据同步的需求。

增量同步

Canal 增量同步的核心是通过连接到 MySQL 数据库,模拟 Slave 的方式订阅 binlog,然后解析 binlog 中的数据变更事件,并将这些事件格式化后提供给下游应用或中间件进行消费。

配置步骤:

  1. 启动 MySQL 的 binlog 日志功能:确保 MySQL 开启了 binlog,并且binlog-formatROW模式,因为ROW模式下,binlog 会记录行的变化,适合数据同步。
  2. 配置 Canal 实例:在 Canal 的配置文件中,指定要同步的 MySQL 数据源信息,包括服务器地址、端口、用户名、密码等。
  3. 配置 Canal 客户端:根据业务需求,开发或配置 Canal 客户端程序,订阅 Canal 服务器解析 binlog 后的数据变更事件,并进行消费处理,比如同步到 Elasticsearch、Kafka 等。
全量同步

对于全量数据同步的需求,一种常见的做法是在进行增量数据同步之前,使用数据迁移工具(如mysqldumpmydumper/myloaderDataX等)先进行一次全量数据的导出和导入,以确保增量同步启动时,下游系统的数据是完整的。

全量+增量同步策略:

  1. 全量同步阶段:使用mysqldump等工具导出 MySQL 的全量数据,然后导入到目标系统(如另一个数据库或搜索引擎)。
  2. 开启 Canal 增量同步:在全量同步完成后,启动 Canal 进行增量同步。此时,Canal 会从全量同步结束时的 binlog 位置开始同步数据变更事件,确保数据的连续性和一致性。
  3. 增量同步处理:Canal 客户端监听 Canal 服务器,实时处理增量数据变更事件,同步到下游系统。

注意事项:

  • 在全量同步和增量同步切换过程中,需要仔细处理时间点和数据一致性的问题,确保没有数据遗漏或重复。
  • 全量同步过程可能会对源数据库性能产生影响,合理安排同步时间和优化导出导入策略是必要的。

通过这样的全量+增量同步策略,可以实现数据同步的完整流程,保证数据的完整性和实时性。

总结一下上面的内容;

  • 1、先进行全量同步(使用其他的工具,比如 mysqldump);
  • 2、再通过 Canal 进行增量同步操作(根据 binlog 位置进行订阅同步操作)